package com.fly.docker.service;

import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import com.fly.docker.thread.PingCallable;
import com.fly.docker.thread.PingRunable;
import com.spotify.docker.client.DockerClient;
import com.spotify.docker.client.DockerClient.LogsParam;
import com.spotify.docker.client.LogStream;
import com.spotify.docker.client.exceptions.DockerException;
import com.spotify.docker.client.messages.ContainerConfig;
import com.spotify.docker.client.messages.ContainerCreation;

import lombok.extern.slf4j.Slf4j;

/**
 * DockerService
 */
@Slf4j
@Service
public class DockerService
{
    private Map<String, DockerClient> dockerclients = new TreeMap<>();
    
    private ExecutorService executorService = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new BasicThreadFactory.Builder().namingPattern("t-%03d").daemon(true).priority(Thread.MAX_PRIORITY).build());
    
    /**
     * 新增活跃节点
     * 
     * @param dockerClient
     * @return
     */
    public Map<String, DockerClient> addNew(DockerClient dockerClient)
    {
        dockerclients.put(dockerClient.getHost(), dockerClient);
        return dockerclients;
    }
    
    /**
     * 返回DockerClient资源池
     * 
     * @return
     */
    public Map<String, DockerClient> getClients()
    {
        return dockerclients;
    }
    
    /**
     * 选择活跃Docker服务
     * 
     * @return
     * @throws InterruptedException
     */
    @Deprecated
    private DockerClient choose()
        throws InterruptedException
    {
        // ArrayList 一定概率有null值，原因:通过new ArrayList<>()初始化的大小是0，首次插入触发扩容，并发可能导致出现null值
        List<DockerClient> liveServers = new CopyOnWriteArrayList<>();
        dockerclients.values().stream().forEach(client -> executorService.execute(new PingRunable(client, liveServers)));
        
        // 1000ms内未ping通，即认为不活跃
        int index = 0;
        while (liveServers.isEmpty() && (index++) < 50)
        {
            TimeUnit.MILLISECONDS.sleep(20);
        }
        if (liveServers.isEmpty())
        {
            throw new RuntimeException("未检测到活跃DockerClient，请先初始化");
        }
        return liveServers.get(0);
    }
    
    /**
     * 自动返回首个活跃节点（executorService.invokeAny实现）
     * 
     * @return
     * @throws ExecutionException
     * @throws InterruptedException
     * @throws TimeoutException
     * @throws DockerException
     */
    public DockerClient invokeAny()
        throws InterruptedException, ExecutionException, TimeoutException, DockerException
    {
        if (dockerclients.isEmpty())
        {
            throw new DockerException("未检测到活跃DockerClient，请先初始化");
        }
        // lambda写法
        List<PingCallable> tasks = dockerclients.values().stream().map(client -> new PingCallable(client)).collect(Collectors.toList());
        return executorService.invokeAny(tasks, 1000, TimeUnit.MILLISECONDS);
    }
    
    /**
     * 根据index选择Docker服务
     * 
     * @param index
     * @return
     * @throws DockerException
     * @throws InterruptedException
     */
    public DockerClient choose(int index)
        throws DockerException, InterruptedException
    {
        String[] keys = dockerclients.keySet().toArray(new String[0]);
        if (keys.length == 0)
        {
            throw new DockerException("未检测到活跃DockerClient，请先初始化");
        }
        if (index < 0 || index >= keys.length)
        {
            throw new DockerException("请输入正确的index");
        }
        String key = keys[index];
        DockerClient dockerClient = dockerclients.get(key);
        if (!StringUtils.equals("OK", dockerClient.ping()))
        {
            throw new DockerException("未检测到活跃DockerClient");
        }
        return dockerClient;
    }
    
    /**
     * 异步调用Docker
     * 
     * @throws DockerException
     * @throws InterruptedException
     */
    @Async
    public void process()
        throws DockerException, InterruptedException
    {
        DockerClient dockerClient = choose();
        // 拉取image
        String imageName = "registry.cn-shanghai.aliyuncs.com/00fly/docker-client-show:0.0.1";
        dockerClient.pull(imageName);
        
        // 创建并启动
        ContainerConfig containerConfig = ContainerConfig.builder().image(imageName).build();
        ContainerCreation container = dockerClient.createContainer(containerConfig);
        dockerClient.startContainer(container.id());
        printContainLog(dockerClient, container.id());
        
        // 停止容器
        TimeUnit.SECONDS.sleep(10);
        dockerClient.stopContainer(container.id(), 5);
        printContainLog(dockerClient, container.id());
        
        // 移除容器
        TimeUnit.SECONDS.sleep(5);
        dockerClient.removeContainer(container.id());
    }
    
    /**
     * 打印日志
     * 
     * @param dockerClient
     * @param containerId
     * @throws DockerException
     * @throws InterruptedException
     */
    private void printContainLog(DockerClient dockerClient, String containerId)
        throws DockerException, InterruptedException
    {
        try (LogStream stream = dockerClient.logs(containerId, LogsParam.stdout(), LogsParam.stderr()))
        {
            String logs = stream.readFully();
            log.info("{}", logs);
        }
    }
}
